跳到主要内容

SpringWebFlux 使用

webflux 适用范围

主要适合 IO 密集型的任务,提高其并发量,但是对于响应速度而言和 Spring MVC 是一样的

而且,webflux 是不支持 JDBC 的

配置环境

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>

响应式 RestController

@RestController
@RequestMapping("/example")
public class ExampleController {
@GetMapping("/{id}")
private Mono<String> test(@PathVariable String id) {
return Mono.just("hello " + id);
}
}

测试阻塞操作

@Slf4j
@RestController
@RequestMapping("/example")
public class ExampleController {

@GetMapping("/{id}")
private Mono<String> test(@PathVariable String id) {
log.info("test start");
Mono<String> result = Mono.fromSupplier(() -> createStr());
log.info("test end");
return result;
}

private String createStr() {
try {
// 阻塞一段时间
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello ~~";
}
}

打印结果:

2021-08-29 14:50:28.998  INFO 8948 --- [ctor-http-nio-2] c.e.w.controller.ExampleController       : test start1630219828998
2021-08-29 14:50:28.999 INFO 8948 --- [ctor-http-nio-2] c.e.w.controller.ExampleController : test end1630219828999

可以看到并没有阻塞

各种工具类的概念

reactor 包是什么?

public class MainTest {
public static void main(String[] args) {
// 这个 webflux 包提供的 reactor 其实就等于 JDK8 的 Stream 加上 JDK9 的 Reactive Stream(响应流)
// Mono 等于 0-1 个元素
// Flux 等于 0-N 个元素


// 编写一个 JDK9 的 Reactive Stream 的那个处理器
Subscriber<Integer> subscriber = new Subscriber<Integer>() {

private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;

// 请求一个数据
this.subscription.request(1);
}

@Override
public void onNext(Integer item) {
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);

// 处理完调用request再请求一个数据
this.subscription.request(1);

// 或者已经达到了目标, 可以调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}

@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();

// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}

@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}

};

String[] strs = {"1", "2", "3"};

// 例如使用 reactor 包的这个 Flux
// 这里就是 JDK8 的 Stream 操作,对字符串转成 Integer 类型
Flux.fromArray(strs).map(s -> Integer.parseInt(s))
// 然后最终操作 “订阅” 这里就是 JDK9 的 Reactive Stream 的那个订阅操作
.subscribe(subscriber);
}
}

打印的结果

接受到数据: 1
接受到数据: 2
接受到数据: 3
处理完了!

Mono 的使用

Mono 返回 0-1 个元素

@RestController
@RequestMapping("/example")
public class ExampleController {

@GetMapping("/{id}")
private Mono<String> test(@PathVariable String id) {
return Mono.just("hello " + id);
}
}

Flux 的使用

Flux 返回 0-N 个元素

让返回结果一条一条的返回

// 要通过指定 produces 来表示流的形式返回("text/event-stream")
@GetMapping(value = "/test2", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
private Flux<String> test2() {
Flux<String> result = Flux.fromStream(IntStream.range(1, 50).mapToObj(i -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
return "flux data -- " + i;
}));
return result;
}

注意要在浏览器上使用才能一条条的显示,IDEA 自带的那个还是会等到全部再显示

注意:这个一条条的返回是 HTTP 的 SSE 协议(server-sent events)

补充:前端要拿到 SSE 数据需要使用 EventSource 对象

var sse = new EventSource("SSE")
sse.onmessage = function(e) {
console.log("message", e.data)
}

处理 Mono 和 Flux(中间阶段)

中间阶段的 Mono 和 Flux 的方法主要有 filter、map、flatMap、then、zip、reduce 等。这些方法使用方法和 Stream 中的方法类似。

下面举几个 Reactor 开发实际项目的问题,帮大家理解这些方法的使用场景:

问题一: map、flatMap 和 then 在什么时候使用

本段内容将涉及到如下类和方法:

  • 方法:Mono.map()
  • 方法:Mono.flatMap()
  • 方法:Mono.then()
  • 类:Function

在 Mono 和 Flux 中间环节的处理过程中,有三个有些类似的方法:map()flatMap()then()。这三个方法的使用频率很高。

传统的命令式编程

Object result1 = doStep1(params);
Object result2 = doStep2(result1);
Object result3 = doStep3(result2);

对应的反应式编程

Mono.just(params)
.flatMap(v -> doStep1(v))
.flatMap(v -> doStep2(v))
.flatMap(v -> doStep3(v));

从上面两段代码的对比就可以看出来 flatMap() 方法在其中起到的作用,map()then() 方法也有类似的作用。但这些方法之间的区别是什么呢?我们先来看看这三个方法的签名(以 Mono 为例):

flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)
map(Function<? super T, ? extends R> mapper)
then(Mono other)

then()

then() 看上去是下一步的意思,但它只表示执行顺序的下一步,不表示下一步依赖于上一步。then() 方法的参数只是一个 Mono,无从接受上一步的执行结果。

flatMap()map() 的参数都是一个 Function,入参是上一步的执行结果。

所以 then() 就是让之前的 Mono 结束,返回一个新的 Mono

flatMap() 和 map()

主要研究一下 flux 的 map 与 flatMap 的区别

flatMap()map() 的区别在于,flatMap() 中的入参 Function 的返回值要求是一个 Mono 对象,而 map 的入参 Function 只要求返回一个 普通对象。

在业务处理中常需要调用 WebClient 或 ReactiveXxxRepository 中的方法,这些方法的 返回值 都是 Mono(或 Flux)。所以要将这些调用串联为一个整体 链式调用,就必须使用 flatMap(),而不是 map()

map 转换

map 是纯元素转换

@Test
public void testMap() throws InterruptedException {
Flux.just(1, 2, 3, 4)
.log()
.map(i -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i * 2;
})
.subscribe(e -> LOGGER.info("get:{}",e));
}

flatMap 转换

这里的 flatMap,将元素转为 Mono 或 Flux,转换操作里头还可以进行异步操作

@Test
public void testFlatMap() throws InterruptedException {
Flux.just(1,2,3,4)
.log()
.flatMap(e -> {
// 内部可以返回一个异步操作
return Flux.just(e * 2)
.delayElements(Duration.ofSeconds(1));
})
.subscribe(e -> LOGGER.info("get:{}",e));
TimeUnit.SECONDS.sleep(10);
}

异常处理操作

数据源:

Flux<String> stringFlux = Flux.just("str1" , "str2" , "str3" , "str4")
.map(item -> {
if(item.equals("str3")) throw new RuntimeException("mtk test error");
LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
return item;
});

onErrorReturn

效果:捕获异常并返回一个固定的值

stringFlux.onErrorReturn(ex -> "mtk test error".equals(ex.getMessage()) , "mtk fallback value")
.onErrorReturn("hahahaha")
.subscribe(System.out::println);

结果:

str1
str2
mtk fallback value

由结果可以看到第二个 onErrorReturn("hahahaha") 并没有被执行,因为错误在第一个时被转换为一个字符串了

onErrorResume

效果:提供一个异常对象给你,要求你返回一个 Publisher 的实现类,即你可以根据该异常对象自定义需要返回给下游的东西,实际上大部分方法都是依靠本方法来实现的,比如 onErrorReturn、onErrorMap

stringFlux.onErrorResume(ex -> "mtk test error".equals(ex.getMessage()) , ex -> {
return Mono.just("error mono");
})
.subscribe(System.out::println);

结果:

str1
str2
error mono

onErrorMap

效果:将异常转换为一个新的异常

stringFlux.onErrorMap(ex -> new IllegalStateException("transform the ex"))
.onErrorResume(ex -> {
return Mono.just(ex.toString());
})
.subscribe(System.out::println);

结果:

str1
str2
java.lang.IllegalStateException: transform the ex

doOnError

提供一个异常对象,让你自己进行自定义操作,不需要返回。注意 doOnError 并不会把错误拦截消费掉,错误依然会流入下游,但是由于有 doOnError 的存在,异常信息并不会被显示报出,即不会在控制台打印出堆栈信息

stringFlux
.doOnError(ex -> System.out.println(ex.getMessage()))
.onErrorReturn("hahahaha")
.subscribe(System.out::println);
str1
str2
mtk test error
hahahaha

doFinally

提供一个 SignalType 对象给你,该对象表明从下游来的信号类型,比如 request、subscribe、cancel、onNext、onError 等等。注意是下游传来的信号!!!

示例1:

stringFlux
.doFinally(signalType -> {
if(signalType.equals(SignalType.CANCEL)){
System.out.println("cancel");
}else if(signalType.equals(SignalType.ON_ERROR)){
System.out.println("error");
}
})
.take(0)
.subscribe(System.out::println , System.out::println);

结果1:

cancel

示例2:

stringFlux
.doFinally(signalType -> {
if(signalType.equals(SignalType.CANCEL)){
System.out.println("cancel");
}else if(signalType.equals(SignalType.ON_ERROR)){
System.out.println("error");
}
})
.take(3)
.subscribe(System.out::println , System.out::println);

结果2:

str1
str2
java.lang.RuntimeException: mtk test error
error

do 操作符

do 系列操作符有多个,如 doOnNext,doOnSubscribe,doOnUnsubscribe,doOnCompleted,doOnError,doOnTerminate 和doOnEach。

当发布者每发送一个数据时,doOnNext 会被首先调用,然后再 onNext。若发射中途出现异常 doOnError 会被调用,然后 onError。若数据正常发送完毕 doOnCompleted 会被触发,然后执行 onCompleted。

当订阅或者解除订阅 doOnSubscribe,doOnUnsubscribe 会被执行。

Observable.just(1, 2, 3)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG, "doOnNext: " );
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.e(TAG, "doOnError: " );
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
Log.e(TAG, "doOnCompleted: " );
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "doOnSubscribe: " );
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "doOnUnsubscribe: " );
}
})
.doOnTerminate(new Action0() {
@Override
public void call() {
Log.e(TAG, "doOnTerminate: " );
}
})
.doAfterTerminate(new Action0() {
@Override
public void call() {
Log.e(TAG, "doAfterTerminate: " );
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted1: ");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError1: ");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext1: " + integer);
}
});

输出结果:

doOnSubscribe: 
doOnNext:
onNext1: 1
doOnNext:
onNext1: 2
doOnNext:
onNext1: 3
doOnCompleted:
doOnTerminate:
onCompleted1:
doOnUnsubscribe:
doAfterTerminate:

这里要特殊介绍到一个操作符——就是 doOnEach:

doOnEach 顾名思义,就是在执行观察者的每一个方法之前,都会先执行一遍 doOnNext。这里的 doOnEach 传入一个 Subscriber 参数,需要实现其三个对应方法,具体参照一下代码:

Observable.just(1,2,3)
.doOnEach(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}

@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: "+integer);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted1: " );
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError1: " );
}

@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext1: "+integer);
}
});

输出结果:

onNext: 1
onNext1: 1
onNext: 2
onNext1: 2
onNext: 3
onNext1: 3
onCompleted:
onCompleted1:

Reference

参考资料 Guide to Spring 5 WebFlux 参考资料 Reactor学习:四、异常处理 参考资料 聊聊Spring Reactor反应式编程